Serverless Frameworkで構築するStep Functions & AWS Glue Jobを用いたワークフロー
好物はインフラとフロントエンドのかじわらゆたかです。 タイトルだけでなんかお腹いっぱい感ありますが、説明したいことを詰め込んだらこの様になりました。
Serverless Frameworkのプラグインを用いたStep Functionsの構築ブログは坂巻が書いたエントリがあるのでそちらを参考にしてください。
今回のエントリーはこのStep Functionsの中でGlue Jobを動かしてみたといった内容になります。
今回の検証で使ったServerlessFrameworkとStepfunctionsのプラグインのバージョンは下記になります。
- serverless : 1.63.0
- serverless-step-functions : 2.17.1
Serverless Framework(AWS Cloudformation) で AWS Glue Jobを使うには
Serverless FrameworkやプラグインでGlueの対応があるわけではないので、Serverless.yml のresources 内にCloudformationの記法で記載していくことになります。
CloudFormationでGlueのジョブを定義する際には以下のようになります。
Type: Sparkの場合
ConvertParquetJob: Type: AWS::Glue::Job Properties: Role: !GetAtt ETLJobRole.Arn Name: convert-parquet Command: Name: glueetl ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/convert_parquet.py" ExecutionProperty: MaxConcurrentRuns: 50 MaxRetries: 0
Type: Pythonの場合
PrepareFileJob: Type: AWS::Glue::Job Properties: Role: !GetAtt ETLJobRole.Arn Name: prepare-file Command: Name: pythonshell ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/prepare_file.py" PythonVersion: "3" ExecutionProperty: MaxConcurrentRuns: 50 MaxRetries: 0
ドキュメントとしては、CloudFormationのGlue Jobの箇所そのままになります。
AWS::Glue::Job - AWS CloudFormation
記法として変わっているのはCommnadの箇所のNameでSparkかPythonShellの違いを記載しています。
glueetl
と書くとSparkJob、pythonshell
と書くとPythonのジョブとなります。
あと、上にも書いたとおりGlue JobはServerless Frameworkのサポートを受けているわけではないので、 ScriptLocationへのファイルの配置はServerless Frameworkのデプロイコマンドとは別にアップロードを行う必要があります。
この記事では、中の実装についての解説は行いません Glue Jobの実装についての解説は石川や大高の以下の記事を参考にしてみてください。
AWS Glue のジョブタイプ『Python Shell』が Python 3.6と互換性のあるスクリプトをサポートしました
ServerlessFramework のStepFunctions内でGlue Jobを呼び出す。
StepFunctions内でGlue Jobを呼び出す方法については下記のエントリを参考ください
JSONですでに書いているStateMachineがあるのであれば、 JSONをYamlに変換するツールで変換する事が可能です。
上記のブログエントリのGlue Jobを呼び出している箇所をJsonからYamlに変換すると以下のようになります。
--- StartAt: PutItem States: PutItem: Type: Task Resource: arn:aws:states:::dynamodb:putItem Parameters: TableName: test_glue_db Item: id: S: '1' ResultPath: "$.DynamoDB" Next: Glue StartJobRun Glue StartJobRun: Type: Task Resource: arn:aws:states:::glue:startJobRun.sync Parameters: JobName: test_job End: true
今回はDynamoDBに入れるのではなく、Python ShellのジョブからSparkのジョブを呼び出すようにしたいので以下のようになりました。
StartAt: PrepareFile States: PrepareFile: Type: Task Resource: arn:aws:states:::glue:startJobRun.sync Parameters: JobName: prepare-file Next: Glue StartJobRun Glue StartJobRun: Type: Task Resource: arn:aws:states:::glue:startJobRun.sync Parameters: JobName: prepare-file End: true
StepFunctionsのServerless.ymlとして以下のようになります。
service: serverless-glue-stepfunctions provider: name: aws runtime: nodejs12.x region: ap-northeast-1 plugins: - serverless-step-functions functions: Function1: handler: handler.hello stepFunctions: stateMachines: serverless-framework-stepfunctions-glue: name: sls-stepfunctions-glue definition: StartAt: PrepareFile States: PrepareFile: Type: Task Resource: arn:aws:states:::glue:startJobRun.sync Parameters: JobName: prepare-file Next: Glue StartJobRun Glue StartJobRun: Type: Task Resource: arn:aws:states:::glue:startJobRun.sync Parameters: JobName: convert-parquet End: true resources: Resources: ETLJobRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - "glue.amazonaws.com" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Action: "*" Effect: Allow Resource: "*" PrepareFileJob: Type: AWS::Glue::Job Properties: Role: !GetAtt ETLJobRole.Arn Name: prepare-file Command: Name: pythonshell ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/prepare_file.py" PythonVersion: "3" ExecutionProperty: MaxConcurrentRuns: 50 MaxRetries: 0 ConvertParquetJob: Type: AWS::Glue::Job Properties: Role: !GetAtt ETLJobRole.Arn Name: convert-parquet Command: Name: glueetl ScriptLocation: !Sub "s3://${env:GlueScriptBucket}/script/convert_parquet.py" ExecutionProperty: MaxConcurrentRuns: 50 MaxRetries: 0
あとは、ServerlessFramework のStepFunctions内でGlue Jobを呼び出す。の箇所で書いたとおり、GlueのJobのScriptはS3に配置する必要がありますので、以下のようなデプロイ用のスクリプトを用いて配置を行うと良いかと思います。
export GlueScriptBucket="YourBucketName" aws s3 cp ./script/ s3://${GlueScriptBucket}/script/ sls deploy
結論
ちょっと変則的案方法ではありますがこのような方法をとることで、ServerlessFramework 上でGlueのジョブの定義とStepfunctions の呼び出しを行うことが可能となります。